package com.meida.msg.provider.websocket;

import com.alibaba.fastjson.util.TypeUtils;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.meida.common.base.entity.EntityMap;
import com.meida.common.base.utils.FlymeUtils;
import com.meida.common.constants.CommonConstants;
import com.meida.common.constants.QueueConstants;
import com.meida.common.utils.DateUtils;
import com.meida.common.utils.JsonUtils;
import com.meida.msg.client.entity.MsgContent;
import com.meida.msg.client.entity.MsgReceive;
import com.meida.msg.client.entity.MsgType;
import com.meida.msg.provider.service.MsgContentService;
import com.meida.msg.provider.service.MsgReceiveService;
import com.meida.msg.provider.service.MsgTypeService;
import com.meida.starter.rabbitmq.client.RabbitMqClient;
import com.meida.starter.rabbitmq.config.RabbitComponent;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author
 */
@ServerEndpoint("/websocket/{userId}")//此注解相当于设置访问URL
@Component("webSocketServerEndpoint")
@Slf4j
public class WebSocketServerEndpoint {

    @Autowired
    public RabbitMqClient rabbitMqClient;
    @Autowired
    public MsgContentService msgContentService;
    @Autowired
    public MsgReceiveService msgReceiveService;
    @Autowired
    public MsgTypeService msgTypeService;

    private Long userId;

    //与某个客户端的连接会话，需要通过它来给客户端发送数据
    private Session session;

    private static Map<Long, Session> sessionPool = new HashMap<Long, Session>();

    //concurrent包的线程安全Set，用来存放每个客户端对应的WebSocket对象。
    private static CopyOnWriteArraySet<WebSocketServerEndpoint> webSocketSet = new CopyOnWriteArraySet<>();

    /**
     * 建立连接成功
     *
     * @param session
     * @param userId
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") Long userId) {
        try {
            this.session = session;
            this.userId = userId;
            webSocketSet.add(this);
            sessionPool.put(userId, session);
            log.info("【websocket消息】有新的连接，总数为:" + webSocketSet.size());
        } catch (Exception e) {
        }
    }

    /**
     * 连接关闭
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);
        sessionPool.remove(this.userId);
        log.info("【websocket消息】 连接断开，总数{}", webSocketSet.size());
    }

    /**
     * 接收客户端消息
     *
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("【websocket消息】 收到客户端发来的消息：{}", message);
    }

    /**
     * 服务端推送消息
     *
     * @param userId
     * @param message
     */
    public void pushMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                log.info("【websocket消息】 单点消息:" + message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 服务端推送消息
     *
     * @param userId
     * @param sendType
     * @param map
     */
    public void pushMessage(Long userId, Integer sendType, Map<String, Object> map) {
        List<Long> userIds=new ArrayList<>();
        userIds.add(userId);
        boolean save = initMsgData(userIds, sendType, map);
        Session session = sessionPool.get(userId);
        initTypeCode(map);
        String jsonStr = JsonUtils.beanToJson(map);
        if (session != null && session.isOpen()) {
            try {
                log.info("【websocket消息】 单点消息:" + jsonStr);
                if (save) {
                    session.getAsyncRemote().sendText(jsonStr);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 服务端推送消息
     *
     * @param userIdList
     * @param map
     */
    public void pushMessage(List<Long> userIdList, Integer sendType, Map<String, Object> map) {
        boolean save = initMsgData(userIdList, sendType, map);
        for (Long aLong : userIdList) {
            Session session = sessionPool.get(aLong);
            initTypeCode(map);
            String jsonStr = JsonUtils.beanToJson(map);
            if (session != null && session.isOpen()) {
                try {
                    log.info("【websocket消息】 单点消息:" + jsonStr);
                    if (save) {
                        session.getAsyncRemote().sendText(jsonStr);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 服务器端推送消息
     */
    public void pushMessage(String message) {
        try {
            webSocketSet.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 服务器端推送消息(全推)
     */
    public void pushMessage(Map<String, Object> map, Integer sendType) {
        try {
            String jsonStr = JsonUtils.beanToJson(map);
            List<Long> userIds=new ArrayList<>();
            webSocketSet.forEach(ws -> {
                Long userId = ws.userId;
                userIds.add(userId);
                ws.session.getAsyncRemote().sendText(jsonStr);
            });
            initMsgData(userIds, sendType, map);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 后台发送消息到rabbitMQ
     *
     * @param msgContent
     * @param receiveUserIdList
     * @param sendType          系统公告1，个人消息2
     */
    public void sendMessage(MsgContent msgContent, List<Long> receiveUserIdList, Integer sendType) {
        log.info("【websocket消息】广播消息:" + msgContent.getMsgContent());
        Map<String, Object> map = JsonUtils.beanToMap(msgContent);
        map.put("receiveUserIds", receiveUserIdList);
        map.put("sendType", sendType);
        rabbitMqClient.sendMessage(QueueConstants.QUEUE_WEBSOCKET_MSG, map, 2);
    }

    /**
     * 后台发送消息到rabbitMQ
     *
     * @param msgTypeId
     * @param receiveUserIdList
     * @param sendType          系统公告1，个人消息2
     */
    public void sendMessage(Long msgTypeId,String title,String content, List<Long> receiveUserIdList, Integer sendType) {
        MsgContent msgContent=new MsgContent();
        msgContent.setMsgTypeId(msgTypeId);
        msgContent.setMsgContent(content);
        msgContent.setMsgId(IdWorker.getId());
        msgContent.setDeleted(CommonConstants.DEL_NO);
        msgContent.setMsgTitle(title);
        log.info("【websocket消息】广播消息:" + msgContent.getMsgContent());
        Map<String, Object> map = JsonUtils.beanToMap(msgContent);
        map.put("receiveUserIds", receiveUserIdList);
        map.put("sendType", sendType);
        EntityMap result=new EntityMap();
        result.putAll(map);
        rabbitMqClient.sendMessage(QueueConstants.QUEUE_WEBSOCKET_MSG, result);
    }

    /**
     * 此为单点消息
     *
     * @param msgContent
     * @param userId
     * @param sendType   系统公告1，个人消息2
     */
    public void sendMessage(MsgContent msgContent, String userId, Integer sendType) {
        Map<String, Object> map = JsonUtils.beanToMap(msgContent);
        map.put("receiveUserIds", userId);
        map.put("sendType", sendType);
        rabbitMqClient.sendMessage(QueueConstants.QUEUE_WEBSOCKET_MSG, map, 2);
    }



    /**
     * 记录信息到数据库
     *
     * @param userId
     * @param sendType
     * @param map
     */
    public Boolean initMsgData(List<Long> userIds, Integer sendType, Map<String, Object> map) {
        String json = JsonUtils.beanToJson(map);
        MsgContent msgContent = JsonUtils.jsonToBean(json, MsgContent.class);
        List<MsgReceive> list=new ArrayList<>();
        for (Long id : userIds) {
            Long msgId = msgContent.getMsgId();
            MsgReceive msgReceive = new MsgReceive();
            msgReceive.setMsgId(msgId);
            msgReceive.setReceiveUserId(id);
            msgReceive.setDeleted(CommonConstants.DEL_NO);
            msgReceive.setReadState(CommonConstants.DISABLED);
            msgReceive.setCreateTime(DateUtils.getNowDateTime());
            list.add(msgReceive);
        }
        boolean save = msgReceiveService.saveBatch(list);
        if (save) {
            if (sendType == 2) {
                msgContent.setCreateTime(DateUtils.getNowDateTime());
                save = msgContentService.save(msgContent);
            }
        }
        return save;
    }

    /**
     * initTypeCode
     *
     * @param map
     */
    public Map<String, Object> initTypeCode(Map<String, Object> map) {
        Long msgTypeId = TypeUtils.castToLong(map.get("msgTypeId"));
        MsgType msgType = msgTypeService.getById(msgTypeId);
        map.put("typeCode", "");
        if (FlymeUtils.isNotEmpty(msgType)) {
            map.put("typeCode", msgType.getTypeCode());
        }
        return map;
    }
}
